#coding:utf8

from pyspark.sql import *
import os
from pyspark.sql.types import *
import pandas as pd
import pyspark.sql.functions as f

if __name__ == '__main__':
    spark=SparkSession.builder.appName("test11_wordcount_demo")\
        .master("local[*]").getOrCreate()
    sc=spark.sparkContext

    # todo 1 sql风格进行处理
    # localhost_path ="file://"+os.getcwd()+"/../data/input/words.txt"
    localhost_path =os.getcwd()+"/../data/input/words.txt"
    rdd1=sc.textFile(localhost_path).flatMap(lambda line:line.split(" "))\
        .map(lambda word:[word])
    df=rdd1.toDF(["word"])
    df.createOrReplaceTempView("words")

    spark.sql("""
    select word,count(*) ct
    from words
    group by word
    order by ct desc 
    """).show()

    # todo 2 DSL 风格处理
    df=spark.read.text(localhost_path)
    # 在PySpark中，DataFrame.withColumn 是一个常用的方法，用于添加一个新的列或者替换已有的列。
    # colName: 新列的名称或者要替换的列的名称。
    df.withColumn("value",f.explode(f.split(df["value"],' ')))\
        .groupby("value").count()\
        .withColumnRenamed("value","word")\
        .withColumnRenamed("count","ct")\
        .orderBy("ct",ascending=False).show()







